初识产品

该部分主要描述CEDA各个组件的安装配置使用以及代码示例等内容

第一步:CEDA产品环境配置
第二步:导入Sample代码
第三步:初识CEDA Server/Client API
第四步:初识消息中间件AMQ
第五步:初始注册服务器
第六步:初识安全通讯服务器ACS
第七步:初识客户端框架



CEDA产品环境配置

操作系统要求

ATF客户端系统要求:Windows XP/Windows 7, .net framework 2.0
sp2/.net framework 3.5sp1
服务器端系统要求:Linux(CentOS5.4 或 RHEL5.4), Java 6



开发环境

Java:JDK 6, Eclipse 3.2
C++:VisualStudio 2008
C#:VisualStudio 2008



导入Sample代码

Java

1.从Eclipse导入工程
2.选择next
3.选择sample代码所在的文件夹
4.点击finish,完成代码导入



C#

安装了Visual Studio 2008后直接点击例子里面的*.csproj文件, 就可以打开sample代码.

初识CEDA Server/Client API

下载和安装


API集成在CEDA的安装包中,下载和安装请见CEDA安装包的下载页面。



初始化server

server需要实现接口IServerConnectionListener:

publicclass P2PPublisher implements IServerConnectionListener


设置IP地址和端口, 启动server:

// 设置server参数
ServiceInfo serviceInfo = new ServiceInfo();
serviceInfo.setHost(host);
serviceInfo.setPort(port);
// 启动server
serverHandler = ServiceManager.getInstance().startServer(
serviceInfo, this);


消息处理器:

class MessageHandler implements IConnectListener


client验证时创建对应的消息处理器:

publicboolean userValidation(UserInfo arg0, IServerConnectionconnHandler)
{
MessageHandler msgHandler = new MessageHandler();
connHandler.setListener(msgHandler);
returntrue;
}
publicboolean userValidation(ClientInfo arg0, IServerConnectionconnHandler)
{
MessageHandler msgHandler = new MessageHandler();
connHandler.setListener(msgHandler);
returntrue;
}



client连接到server

client需要实现2个接口:

1 IEventListener: 监听与server的连接信息
2 IMessageListener: 从server接收消息

publicclass P2PSubscriber implements IMessageListener, IEventListener


设置参数, 连接到server:

info.setUser("test", "test");ClientInfo info = new ClientInfo();
info.setUser("test", "test");
// 设置服务端的IP地址和端口号
info.setAddress("serverHost", "serverPort");
// 创建连接
IClientConnection conn = ClientConnectionFactory
.createConnection(info);
conn.addEventListener(this);
conn.start();


client订阅消息

// 订阅topic
IClientSession session = conn.createSession();
consumer = session.createConsumer(new Destination(topic));
consumer.addMessageListener(this);



server发布消息

在消息处理器里处理client的订阅与退订:

publicvoid onSubscribe(List topicList,
IServerConnection connHandler) {
serverHandler.subscribe(topicList, connHandler);
}
publicvoid onUnSubscribe(List topicList, IServerConnection connHandler)
{
serverHandler.unSubscribe(topicList, connHandler);
}


构建消息并发布:

// 构建消息
Message msg = new Message();
msg.setDestination(new Destination(topic));
msg.getMessageBody().addString((short) 1, content);
msg.getMessageBody().addBytes((short) 2, buf);
// 发送消息
serverHandler.sendMessage(msg);


client request


先订阅了请求消息的topic:

// 订阅topic
IClientSession session = conn.createSession();
MessageConsumer consumer = session.createConsumer(new Destination( topic));
consumer.addMessageListener(this);


然后发送请求消息:

// 发送request
MessageSender sender = session.createProducer();
requestCount++;
// 构造request消息
Message msgRequest = new Message();
msgRequest.setDestination(new Destination(topic));
msgRequest.setMessageID(requestCount);
msgRequest.setMessageBody(new MessageBody());
// 设置request参数
msgRequest.getMessageBody().addInt((short) 1, 1);
msgRequest.getMessageBody().addInt((short) 2, 2);
// 发送request消息
sender.send(msgRequest);


在onMessage接口里面获取reply消息:

publicvoid onMessage(Message message) {
if (message != null) {
try {
System.out.printf("Receive a message : id=%d,
result=%d",message.getMessageID(),
message.getMessageBody().getInt((short) 1));
} catch (MessageBodyException e) {
e.printStackTrace();
}
}
}


server reply

在MessageHandler的onMessage接口里面接收client request的消息, 然后生成reply 消息给client:

// 从request消息生成reply消息
Message msgReply = msgRequest.createReplyMessage();
// 获取请求参数
int arg1 = msgRequest.getMessageBody().getInt((short) 1);
int arg2 = msgRequest.getMessageBody().getInt((short) 2);
int result = arg1 + arg2;
// 设置结果
msgReply.getMessageBody().addInt((short) 1, result);
// 返回reply消息
connHandler.sendMessage(msgReply);



示例代码

完整示例代码在目录/example/java/01 cedaapi/下:

client订阅消息 ats/example/java/P2PSubscriber.java
server发布消息  ats/example/java/P2PPublisher.java
 client request  ats/example/java/P2PRequestClient.java
 server reply  ats/example/java/P2PReplyServer.java



初识消息中间件AMQ



下载和安装

下载和安装最新版本的AMQ,请见下载页面。



连接AMQ

使用CEDA api连接到AMQ中间件:

ClientInfo info = new ClientInfo();
info.setUser("test", "test");
// 设置AMQ的IP地址和端口号
info.setAddress(host, port);
// 创建连接
IClientConnection conn = ClientConnectionFactory
.createConnection(info);
conn.addEventListener(this);
conn.start();


订阅消息



创建session和consumer, 订阅消息:

// 订阅topic,接收消息
IClientSession session = conn.createSession();
MessageConsumer consumer = session.createConsumer(new Destination(topic));
consumer.addMessageListener(this);


在onMessage接口处理接收到的消息:

publicvoid onMessage(Message message)
{
if (message != null)
{
try {
System.out.println("Receive a message : "
+ message.getMessageBody().getString((short) 1));
}
catch (MessageBodyException e) {
e.printStackTrace();
}
}
}



发布消息

登录上AMQ后创建session和producer:

// 创建Session和Sender
session = conn.createSession();
sender = session.createProducer();



构建消息, 并发布到AMQ上:

// 构建消息
Message msg = new Message();
msg.setDestination(new Destination(topic));
msg.setMessageBody(new MessageBody());
msg.getMessageBody().addString((short) 1, content);
msg.getMessageBody().addBytes((short) 2, buf);
// 发送消息
sender.send(msg);



示例代码

完整示例代码在目录/example/java/ 02 amqclient/下:

订阅消息 ats/example/java/MQPublisher.java
 发布消息 ats/example/java/MQSubscriber.java



初始注册服务器

下载和安装

注册服务器集成在CEDA的安装包中,下载和安装请见CEDA安装包的下载页面。



连接注册服务器

为了连接到注册服务器, 需要实现以下2个接口:
1.IEventListener: 应用和注册服务器的连接状态信息
2.IClusterListener: 注册应用变动信息

初始化:

serviceInfo = new ServiceInfo();
serviceInfo.setHost("127.0.0.1");
// 使用register server balance模式
serviceInfo.setType(ServiceInfo.SERVICE_TYPE_STANDBY);
serviceInfo.setName(this.getClass().getSimpleName());
// 监听连接到register server的事件
ServiceManager.getInstance().addRegisterEventListener(this);


目前支持standby(主备机)和load balance(负载均衡)2种模式:

standby模式:

serviceInfo.setType(ServiceInfo.SERVICE_TYPE_STANDBY);

load balance模式:

serviceInfo.setType(ServiceInfo.SERVICE_TYPE_BLANCE);

创建集群:

// 创建集群
registerClient = ServiceManager.getInstance().createClustClient(
serviceInfo.getName());
// 监听集群事件
registerClient.addListener(this);



连接到注册服务器:

ServiceManager.getInstance().connectRegister(host + ":" + port);



注册服务

在IEventListener的onEvent方法里面监听连接注册服务器事件:

publicvoid onEvent(int nCode) {
switch (nCode) {
case IEventListener.CONNECTION_CONNECTED:
System.out.println("Service: register CONNECTION_CONNECTED");
// 连接后注册服务
registerService();
break;
case IEventListener.CONNECTION_CLOSED:
System.out.println("Service: register CONNECTION_CLOSED");
break;
}
}



连接上注册服务器后, 注册应用提供的服务:

registerClient.registService(serviceInfo);



启动服务

standby模式:

注册服务后, 从IClusterListener的onClustChange里监听所有服务的变动情况(新注册服务, 已注册的服务退出等) 服务变动时, 比较应用自己id与work service的id, 如果相同, 则启动服务:

// 获取work service列表
List infoList = registerClient.getWorkService();
for (ServiceInfo sinfo : infoList) {
// 如果是主server,则启动服务
if (sinfo.getSequenceName().equals(serviceInfo.getSequenceName())) {
// 启动服务
if (!isStart) {
System.out.println("start server: "
+ sinfo.getSequenceName());
isStart = true;
}
}
}


load balance模式:

balance模式在初始化后, 先启动服务, 然后再连接注册服务器注册, 不再需要监听onClustChange事件来启动服务.


示例代码

完整示例代码在目录/example/java/03 RegistryServer/下:


 standby服务器端 ats/example/java/StandbyServer.java
 balance服务器端 ats/example/java/BalanceServer.java
 standby客户端 ats/example/java/StandbyClient.java
 balance客户端 ats/example/java/BalanceClient.java



初识安全通讯服务器ACS

下载和安装

安全通讯服务器集成在CEDA的安装包中,下载和安装请见CEDA安装包的下载页面。



ACS作为消息中间件

ACS可以作为消息中间件, 类似于AMQ功能. 消息订阅者和发布者需要以http方式连接上:

ClientInfo info = new ClientInfo();
info.setUser("test", "test");
// 设置ACS的url和端口号
info.setAddress(host, port);
// 设置连接协议,ACS使用http/https
info.setProtocol(ClientInfo.PROTOCOL_HTTP);

ACS的host是一个url.



ACS作为通讯中转服务器

后台服务注册到registry server, 配置ACS连接到相同的registry server, 并且该服务配置到ACS的服务列表里面:

[Register]
#registry server地址,多个地址用“,”分隔
Address=192.168.1.111:2182

#需要连接的Service
RequestService=ACSReplyServer,ACSPublishServer


client通过http方式连接上ACS server, 在建立连接的时候, 设置需要连接的后台服务:

// 创建连接
conn = ClientConnectionFactory.createConnection(info);
// 设置要连接的服务名称
conn.setMQServer(ACSReplyServer.class.getSimpleName());
conn.addEventListener(this);
conn.start();

以后该连接调用的服务就是后台对应的服务.



示例代码

完整示例代码在目录/example/java/ 04 ACS/下:

 作为消息中间件的订阅/发布例子 : ats/example/java/ACSPublisher.java
  ats/example/java/ACSSubscriber.java

 作为中转服务器的request/reply例子: ats/example/java/ACSReplyServer.java
  ats/example/java/ACSRequestClient.java

 作为中转服务器的订阅/发布例子 : ats/example/java/ACSPublishServer.java
 ats/example/java/ACSSubscribeClient.java



初识客户端框架

下载和安装

客户端框架集成在CEDA的安装包中,下载请见CEDA安装包的下载页面, 客户端框架中CEDA安装包根目录下/atf文件夹下.



配置

需要根据服务器配置修改/atf/control/SampleLocalEnvSetting.xml中的服务器地址:



运行

双击/atf文件夹下的AtfLite.exe就可以打开框架

编写插件(Plugin)

SimplePlugin

插件开发

1 Visual Studio新建一个Windows Forms的class library工程
2 引用必需的dll(可知/atf/bin下找到):
Atf.Plugin.dllDevExpress.Data.v12.1.dllDevExpress.Utils.v12.1.dllDevExpress.XtraEditors .v12.1.dll
3 新建一个UserControl, 把继承的类由UserControl改成Atf.Plugin.Imp.IPluginDevImpl



插件集成到客户端框架

 1.布局界面, 界面逻辑编码等
2 在/atf/control/AppModules.xml里面增加一个plugin
3.在/atf/control/AppModules.xml里面增加一个module
4 运行框架, 就可以看到插件在左边侧栏里面的模块下面
5.双击打开插件



Market Data Client

这是一个初始化使用request/reply从后台获取数据, 后续通过订阅/发布消息实现更新的例子. 后台服务代码见目录/example/java/ 05MarketDataServer/.

 1.布局界面
2.点击获取价格, 先创建消息处理器MessageHandler,订阅消息:

if (msgHandler == null)
{
msgHandler = MQIIIManager.Instance.RegisterHandler(this.serverName, this.GetType().Name);
msgHandler.MQMessage += newMessageHandler.OnMQMessageDelegate(msgHandler_MQMessage);
}
//订阅主题
msgHandler.SubscribeTopics(newstring[] /p>
{ this.topic });
然后启动线程, 从后台获取初始化数据:
//获取初始化数据,需要在线程里进行
ThreadPool.QueueUserWorkItem(newWaitCallback(RequestInitData));


使用request/reply方式从后台获取初始化数据:

//构建请求消息

com.adaptiveMQ2.message.Message msg = new com.adaptiveMQ2.message.Message();
msg.Destination = new com.adaptiveMQ2.message.BaseDestination(this.topic);
msg.MessageBody = new com.adaptiveMQ2.message.MessageBody();
msg.SvrID = this.serverName;
//请求数据
com.adaptiveMQ2.message.Message replyMsg = MQIIIManager.Instance.RequestMessage(this.serverName, msg);
//处理返回的数据
OnData(replyMsg);


3.订阅的实时消息到来时的处理:

沪公网安备 31011502002921号      技术支持 - 上海子午线新荣科技有限公司 | 产品授权